package org.springboot.rocketmq.controller;

import java.util.ArrayList;
import java.util.List;

import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springboot.rocketmq.config.RocketMqOriginalConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.ObjectUtils;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/rocket")
public class RocketController {

	private static final Logger logger = LoggerFactory.getLogger(RocketController.class);
	
	@Autowired
	RocketMQTemplate rocketMQTemplate;
	
	@Autowired
	@Qualifier("custMQTemplate")
	RocketMQTemplate custMQTemplate;
	
	@Autowired
	private RocketMqOriginalConfig rocketMqOriginalConfig;

	@RequestMapping(value = "/send")
    public String send(String message) {
		if (ObjectUtils.isEmpty(message)) {
			return "Please enter message!";
		}
		if ("orgn".equals(message)) {
			// 原生方式消息发送
			rocketMqOriginalConfig.send("test_boot_orgn", "tag_orgn", message + " message!");
			return "orgn success";
		}
		if ("cust".equals(message)) {
			String destination = "test_boot_cust:tag_cust";
			// 自定义 custMQTemplate
	        custMQTemplate.convertAndSend(destination, message + " message!");
			return "cust success";
		}
		// 终点：即消息的键。由 topic 和 tag （可无）通过:连接而成，例 topic:tag、topic
        String destination = "test_boot_auto:tag_auto";
		if ("sync".equals(message)) {
			// 同步发送
			syncSend(destination, message + " message!");
		} else if ("async".equals(message)) {
			// 同步发送
			asyncSend(destination, message + " message!");
		} else if ("oneWay".equals(message)) {
			// 单向发送，适用于可靠性要求不高的场景
			sendOneWay(destination, message + " message!");
		} else if ("order".equals(message)) {
			// 顺序发送
			orderSend(destination, message + " message!");
		} else if ("batch".equals(message)) {
			// 批量发送
			batchSend(destination, message + " message!");
		} else if ("schedule".equals(message)) {
			// 延时发送
			scheduleSend(destination, message + " message!");
		} else {
			return "Message error: " + message;
		}
		return "auto success";
	}

    private void syncSend(String destination, Object message) {
        // 方法一，操作方便
        SendResult sendResult = rocketMQTemplate.syncSend(destination, message);
        // 方法二，是方法一的底层方法，设置 header 可用于消费者sql过滤选择消费 （value = 1）
        sendResult = rocketMQTemplate.syncSend(destination, MessageBuilder.withPayload(message).setHeader("value", 1).build());
        logger.info("同步消息发送完成，" + sendResult.getSendStatus());
    }

    private void asyncSend(String destination, Object message) {
        // 发送回调接口对象：用于异步处理发送结果
        SendCallback sendCallback = new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
            	logger.info("异步消息发送完成，" + sendResult.getSendStatus());
            }
            
            @Override
            public void onException(Throwable throwable) {
                // 发送失败时执行逻辑。一般是将失败数据入库，采用定时的方式尝试再次发送
            	logger.error("异步消息发送失败！", throwable);
            }
        };
        // 方法一，操作方便
        rocketMQTemplate.asyncSend(destination, message, sendCallback);
        // 方法二，是方法一的底层方法
        rocketMQTemplate.asyncSend(destination, MessageBuilder.withPayload(message).build(), sendCallback);
    }

    private void sendOneWay(String destination, Object message) {
        // 方法一，操作方便
    	rocketMQTemplate.sendOneWay(destination, message);
        // 方法二，是方法一的底层方法
    	rocketMQTemplate.sendOneWay(destination, MessageBuilder.withPayload(message).build());
    }

    private void orderSend(String destination, Object message) {
    	// 通过 key 保证消息发送到一个队列上来保证消费顺序
        String key = "key_auto";
        // 同步顺序发送
        SendResult sendResult = rocketMQTemplate.syncSendOrderly(destination, "sync " + message, key);
        logger.info("同步顺序消息发送完成，" + sendResult.getSendStatus());

        // 异步顺序发送
        SendCallback sendCallback = new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
            	logger.info("异步顺序消息发送完成，" + sendResult.getSendStatus());
            }

            @Override
            public void onException(Throwable throwable) {
                // 发送失败时执行逻辑。一般是将失败数据入库，采用定时的方式尝试再次发送
            	logger.error("异步顺序消息发送失败！", throwable);
            }
        };
        rocketMQTemplate.asyncSendOrderly(destination, "async " + message, key, sendCallback);

        // 单向顺序发送
        rocketMQTemplate.sendOneWayOrderly(destination, "oneWay " + message, key);
    }

    private void batchSend(String destination, Object message) {
        // 消息列表
        List<Message<String>> messages = new ArrayList<>();
        messages.add(MessageBuilder.withPayload("one " + message).build());
        messages.add(MessageBuilder.withPayload("two " + message).build());
        
        // 同步批量发送
        SendResult sendResult = rocketMQTemplate.syncSend(destination, messages);
        logger.info("同步批量消息发送完成，" + sendResult.getSendStatus());
        
        // 异步批量发送
        SendCallback sendCallback = new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
            	logger.info("异步批量消息发送完成，" + sendResult.getSendStatus());
            }

            @Override
            public void onException(Throwable throwable) {
                // 发送失败时执行逻辑。一般是将失败数据入库，采用定时的方式尝试再次发送
            	logger.error("异步批量消息发送失败！", throwable);
            }
        };
        rocketMQTemplate.asyncSend(destination, messages, sendCallback);

        // 单向发送无批量发送功能
    }

    /**
     * 延时发送
     * @param string 
     * @param destination2 
     */
    private void scheduleSend(String destination, Object message) {
    	// 超时时间：超出该时间后会报超时异常。
        long timeout = 3000L;
        // 延时等级：1～18，分别延时1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
        // 发消息时，设置 delayLevel 等级即可：msg.setDelayLevel(level)。level 有以下三种情况：
        // level == 0，消息为非延迟消息
        // 1 <= level <= maxLevel，消息延迟特定时间，例如 level == 1，延迟 1s
        // level > maxLevel，则 level == maxLevel，例如 level == 20，延迟 2h
        int delayLevel = 3;
        // 同步延时发送
        SendResult sendResult = rocketMQTemplate.syncSend(destination, MessageBuilder.withPayload("sync " + message).build(), timeout, delayLevel);
        logger.info("同步延时消息发送完成，" + sendResult.getSendStatus());

        // 异步延时发送
        SendCallback sendCallback = new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
            	logger.info("异步延时消息发送完成，" + sendResult.getSendStatus());
            }

            @Override
            public void onException(Throwable throwable) {
                // 发送失败时执行逻辑。一般是将失败数据入库，采用定时的方式尝试再次发送
            	logger.error("异步延时消息发送失败！", throwable);
            }
        };
        rocketMQTemplate.asyncSend(destination, MessageBuilder.withPayload("async " + message).build(), sendCallback, timeout, delayLevel);
        
        // 单向发送无延时发送功能
    }

}
